-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Optimization of Parquet Predicate Pushdown Capability #4608
Conversation
Looks very nice! Thanks @Aiden-Dong , I will take a review next week. |
@Aiden-Dong Can you add test for parquet page predicate pushdown and deletion vectors enabled? I just want to make sure |
Spark Issue: https://issues.apache.org/jira/browse/SPARK-34859 |
确实这个 currentRowPosition 在 deletion vector 启用时索引存在问题, 等我我修复一下这个功能,并加入测试 |
Based on the error log, it seems that this error is not likely caused by the recent changes. |
You can enable action in your repo to double check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for non-test code
.withIOManager(new IOManagerImpl(tempDir.toString())); | ||
|
||
for (int i = 0; i < 2000; i++) { | ||
write.write(rowData(i, i, i * 100L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want test datas in one bucket in deletion vector, just rowData(0, i, i*100L), the first column is "pt" for partition .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int i = 0; i < 2000; i++) {
write.write(rowData(i, i, i * 100L));
The code above will write 2000 records in 2000 partitions.
.newWrite() | ||
.withIOManager(new IOManagerImpl(tempDir.toString())); | ||
for (int i = 1000; i < 2000; i++) { | ||
write.write(rowDataWithKind(RowKind.DELETE, i, i, i * 100L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same rowDataWithKind(RowKind.DELETE, 0, i, i * 100L)
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits()); | ||
|
||
for (int i = 500; i < 510; i++) { | ||
TableRead read = table.newRead().withFilter(builder.equal(0, i)).executeFilter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe builder.equal(1, i) is what you want. Partition predicate will not push down.
} | ||
|
||
for (int i = 1500; i < 1510; i++) { | ||
TableRead read = table.newRead().withFilter(builder.equal(0, i)).executeFilter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misunderstood the first column as the primary key... I'll make the correction.
我把第一列理解成了主键。。我改一改
I have revised the unit tests and added the write cardinality to ensure that there are more pages and RowGroups in the Parquet file. |
Thanks for the testing and PR. I downloaded and tested it immediately. Compared with the previous version, this PR improvement increased the speed of Parquet by nearly 10 times, which is a huge improvement! On my computer, the Parquet result is 8.4s and the ORC result is 4.1s. @Aiden-Dong @JingsongLi @leaves12138 Table table = TableUtil.getTable(); // PrimaryKeyFileStoreTable
PredicateBuilder builder = new PredicateBuilder(
RowType.of(DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING()));
int[] projection = new int[] {0, 1, 2};
ReadBuilder readBuilder = table.newReadBuilder()
.withProjection(projection);
Random random = new Random();
for(int i = 0 ; i < 30 ; i ++){
InnerTableRead read = (InnerTableRead)readBuilder.newRead();
int key = random.nextInt(4000000);
Predicate keyFilter = builder.equal(0, key);
InnerTableScan tableScan = (InnerTableScan) readBuilder
.withFilter(keyFilter)
.newScan();
InnerTableScan innerTableScan = tableScan.withFilter(keyFilter);
TableScan.Plan plan = innerTableScan.plan();
List<Split> splits = plan.splits();
read.withFilter(keyFilter);//.executeFilter();
RecordReader<InternalRow> reader = read.createReader(splits);
reader.forEachRemaining(internalRow -> {
int f0 = internalRow.getInt(0);
String f1 = internalRow.getString(1).toString();
String f2 = internalRow.getString(2).toString();
System.out.println(String.format("%d - {%d, %s, %s}",key, f0, f1, f2));
});
}
long startTime = System.currentTimeMillis();
for(int i = 0 ; i < 1000 ; i ++){
InnerTableRead read = (InnerTableRead)readBuilder.newRead();
int key = random.nextInt(4000000);
Predicate keyFilter = builder.equal(0, key);
InnerTableScan tableScan = (InnerTableScan) readBuilder
.withFilter(keyFilter)
.newScan();
InnerTableScan innerTableScan = tableScan.withFilter(keyFilter);
TableScan.Plan plan = innerTableScan.plan();
List<Split> splits = plan.splits();
read.withFilter(keyFilter);//.executeFilter();
RecordReader<InternalRow> reader = read.createReader(splits);
reader.forEachRemaining(internalRow -> {
int f0 = internalRow.getInt(0);
String f1 = internalRow.getString(1).toString();
String f2 = internalRow.getString(2).toString();
System.out.println(String.format("%d - {%d, %s, %s}",key, f0, f1, f2));
});
}
long stopTime = System.currentTimeMillis();
System.out.println("time : " + (stopTime - startTime)); writer see #4586 |
Thanks @ranxianglei for testing, I will follow up this question. |
I found that the default stripe size and row index stride of ORC are twice as large as those of Parquet, which leads to a finer granularity of indexes in ORC. 我发现 ORC 默认的 stripe 大小以及 row index stride 大小是 parquet 默认配置的两倍, 这导致 ORC 的索引粒度更小 |
@Aiden-Dong Hi, I still have some questions about the pull request. How can I get touch with you? |
It just return the rowIndex in current row group. But what we want is the exactly row index within the whole parquet file.
|
稍等我看一下这个逻辑。 |
I've submitted a PR to fix this logic : #4636 |
switch (runLenDecoder.mode) { | ||
case RLE: | ||
if (runLenDecoder.currentValue == maxDefLevel) { | ||
skipShot(n); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe skipShort?
Purpose
Linked issue: #4586
优化了基于 Parquet 文件过滤读取时的谓词下推能力,将原先的Parquet 谓词下推由RowGroup级别增强到了 Column page 级别,查询性能提升明显。
Optimized the predicate pushdown capability for filtering and reading Parquet files, enhancing the original predicate pushdown from the RowGroup level to the Column Page level, resulting in a significant improvement in query performance.
Tests
API and Format
Documentation